DataFrame API এর মাধ্যমে Complex Data Processing গাইড ও নোট

Big Data and Analytics - স্পার্ক এসকিউএল (Spark SQL) - DataFrames এর সাথে কাজ করা
265

Spark SQL-এর DataFrame API একটি শক্তিশালী এবং নমনীয় ইন্টারফেস, যা ডিস্ট্রিবিউটেড ডেটা প্রক্রিয়াকরণের জন্য ব্যবহার করা হয়। DataFrame API ব্যবহার করে Complex Data Processing খুব সহজে করা যায়, যা বিভিন্ন ধরনের ডেটা ট্রান্সফর্মেশন, ফিল্টারিং, অ্যাগ্রিগেশন এবং বিভিন্ন জটিল কোয়ারি প্রক্রিয়া সমর্থন করে।

এখানে আমরা দেখব কীভাবে DataFrame API ব্যবহার করে জটিল ডেটা প্রসেসিং করা যায় এবং কীভাবে বিভিন্ন ধরনের ট্রান্সফর্মেশন এবং অ্যাকশন অপারেশন ব্যবহার করা যায়।


DataFrame API এর মাধ্যমে Complex Data Processing এর ধাপসমূহ

  1. SparkSession তৈরি করা
    প্রথমেই Spark SQL চালানোর জন্য SparkSession তৈরি করতে হয়, যা Spark-এর প্রধান এন্ট্রি পয়েন্ট হিসেবে কাজ করে।

    from pyspark.sql import SparkSession
    
    # SparkSession তৈরি
    spark = SparkSession.builder.appName("Complex Data Processing").getOrCreate()
    
  2. ডেটা লোড করা
    DataFrame তৈরি করতে বিভিন্ন ডেটা সোর্স (যেমন CSV, JSON, Parquet, Hive ইত্যাদি) থেকে ডেটা লোড করা যেতে পারে।

    # JSON ফাইল থেকে DataFrame লোড করা
    df = spark.read.json("path_to_file.json")
    
    # CSV ফাইল থেকে DataFrame লোড করা
    df_csv = spark.read.option("header", "true").csv("path_to_csv_file")
    

Complex Data Processing উদাহরণ

1. Data Filtering (ফিল্টারিং)

DataFrame এর ওপর filter বা where অপারেশন ব্যবহার করে নির্দিষ্ট শর্ত অনুযায়ী ডেটা ফিল্টার করা যায়।

# বয়স ৩০ এর বেশি এমন কর্মচারীদের ফিল্টার করা
filtered_df = df.filter(df['age'] > 30)
filtered_df.show()

এখানে, df['age'] > 30 শর্তটি পূরণ করা কর্মচারীদের ডেটা রিটার্ন করবে।

2. Data Transformation (ট্রান্সফর্মেশন)

DataFrame API-তে select, withColumn, drop, alias ইত্যাদি ব্যবহার করে ডেটা ট্রান্সফর্ম করা যায়।

  • withColumn: নতুন কলাম যোগ বা বিদ্যমান কলামের মান পরিবর্তন করতে।
# নতুন কলাম 'salary_increase' যোগ করা
df_transformed = df.withColumn("salary_increase", df['salary'] * 1.10)
df_transformed.show()
  • select: নির্দিষ্ট কলাম নির্বাচন করতে।
# 'name' এবং 'age' কলাম নির্বাচন করা
selected_df = df.select("name", "age")
selected_df.show()
  • drop: নির্দিষ্ট কলাম বাদ দিতে।
# 'salary' কলাম বাদ দেওয়া
df_dropped = df.drop("salary")
df_dropped.show()

3. Aggregation (অ্যাগ্রিগেশন)

DataFrame API-তে groupBy এবং agg ব্যবহার করে ডেটা গ্রুপ এবং অ্যাগ্রিগেট করা যায়।

# বিভাগ অনুযায়ী গড় বেতন বের করা
agg_df = df.groupBy("department").agg({"salary": "avg"})
agg_df.show()

এখানে, groupBy("department") বিভাগ অনুযায়ী গ্রুপ করে এবং agg({"salary": "avg"}) গড় বেতন বের করে।

4. Joining DataFrames (জয়েনিং)

DataFrame API-তে join ব্যবহার করে একাধিক DataFrame-এর মধ্যে ডেটা মিশ্রিত করা যায়।

# দুটি DataFrame যুক্ত করা, যেখানে department_id মিলবে
joined_df = df1.join(df2, df1.department_id == df2.department_id)
joined_df.show()

এখানে, df1 এবং df2 DataFrame দুটি department_id কলামের মাধ্যমে যুক্ত করা হয়েছে।

5. Sorting (অর্ডারিং)

DataFrame API তে orderBy ব্যবহার করে ডেটা সাজানো যায়।

# বেতন অনুযায়ী কর্মচারীদের সাজানো
sorted_df = df.orderBy(df['salary'], ascending=False)
sorted_df.show()

এখানে, salary কলাম অনুসারে কর্মচারীদের ডেটা নামিয়ে সাজানো হয়েছে।

6. Complex Calculations (জটিল গণনা)

DataFrame API তে গণনা বা কাস্টম লজিক প্রয়োগ করতে expr বা udf (User Defined Function) ব্যবহার করা যায়।

from pyspark.sql.functions import expr

# কর্মচারীদের বেতন ও ট্যাক্স হিসাব করা
df_calculated = df.withColumn("tax", expr("salary * 0.05"))
df_calculated.show()

এখানে, expr("salary * 0.05") ব্যবহার করে বেতন থেকে ৫% ট্যাক্স হিসাব করা হয়েছে।


একাধিক DataFrame-এ কাজ করা

DataFrame API ব্যবহার করে একাধিক ডেটাসেট বা টেবিলের ওপর JOIN, UNION, এবং INTERSECT অপারেশনও করা যেতে পারে।

  • Union: দুইটি DataFrame একত্রিত করার জন্য।
# দুটি DataFrame একত্রিত করা
union_df = df1.union(df2)
union_df.show()
  • Intersect: দুটি DataFrame এর মধ্যে যে ডেটা একসাথে রয়েছে তা বের করা।
# দুটি DataFrame এর কমন ডেটা বের করা
intersect_df = df1.intersect(df2)
intersect_df.show()

সারাংশ

Spark SQL এর DataFrame API ব্যবহার করে Complex Data Processing অনেক সহজ এবং কার্যকরী হয়ে ওঠে। DataFrame API তে বিভিন্ন ধরনের ট্রান্সফর্মেশন যেমন filter, select, withColumn, groupBy এবং join ব্যবহার করে ডেটা পরিসংখ্যান, ফিল্টারিং, গ্রুপিং, এবং জয়েনিং করা যায়। এইসব অপারেশনগুলি লিনিয়ার এবং প্যারালাল প্রসেসিংয়ে দ্রুত কাজ করে, কারণ Spark SQL ডিস্ট্রিবিউটেড কম্পিউটিং পরিবেশে কাজ করে। DataFrame API-তে জটিল গণনা এবং কাস্টম লজিক প্রয়োগ করা সম্ভব, যা ডেটা বিশ্লেষণ এবং বিশ্লেষণাত্মক কাজকে আরও শক্তিশালী করে তোলে।

Content added By
Promotion

Are you sure to start over?

Loading...